/**
 * @file kafka_error_callback_example.cpp
 * @brief Kafka错误回调函数使用示例
 * @author Enterprise Development Team
 * @date 2025/7/2
 * 
 * 展示如何在实际项目中使用Kafka错误回调函数
 */

#include "common/kafka/kafka_error_handler.h"
#include "common/logger/logger.h"
#include <iostream>
#include <thread>
#include <chrono>

// 模拟Kafka生产者类
class MockKafkaProducer {
private:
    std::function<void(const std::string&)> errorCallback_;
    
public:
    void setErrorCallback(std::function<void(const std::string&)> callback) {
        errorCallback_ = callback;
    }
    
    // 模拟触发错误
    void simulateError(const std::string& errorMessage) {
        if (errorCallback_) {
            errorCallback_(errorMessage);
        }
    }
    
    void send(const std::string& message) {
        // 模拟发送消息，可能触发错误
        std::cout << "发送消息: " << message << std::endl;
    }
};

/**
 * @brief 示例1: 基本使用
 */
void example1_basic_usage() {
    std::cout << "\n=== 示例1: 基本使用 ===" << std::endl;
    
    // 创建Kafka生产者
    MockKafkaProducer producer;
    
    // 创建标准错误回调
    auto errorCallback = kafka::createKafkaErrorCallback();
    
    // 设置错误回调
    producer.setErrorCallback(errorCallback);
    
    // 模拟各种错误
    producer.simulateError("Connection failed to broker localhost:9092");
    producer.simulateError("Authentication failed: Invalid credentials");
    producer.simulateError("Topic 'user-events' does not exist");
    producer.simulateError("Request timeout after 30000ms");
    
    // 查看错误统计
    std::cout << "\n错误统计:\n" << kafka::getKafkaErrorStatistics() << std::endl;
}

/**
 * @brief 示例2: 自定义错误处理
 */
void example2_custom_handling() {
    std::cout << "\n=== 示例2: 自定义错误处理 ===" << std::endl;
    
    MockKafkaProducer producer;
    
    // 创建自定义错误处理逻辑
    auto customHandler = [](const std::string& errorMessage) {
        std::cout << "🔧 自定义处理: " << errorMessage << std::endl;
        
        // 业务特定的错误处理
        if (errorMessage.find("authentication") != std::string::npos) {
            std::cout << "  → 触发重新认证流程" << std::endl;
            // 这里可以调用重新认证的逻辑
        } else if (errorMessage.find("connection") != std::string::npos) {
            std::cout << "  → 触发连接重试机制" << std::endl;
            // 这里可以调用连接重试的逻辑
        } else if (errorMessage.find("topic") != std::string::npos) {
            std::cout << "  → 检查Topic配置" << std::endl;
            // 这里可以调用Topic检查的逻辑
        }
        
        // 更新业务指标
        std::cout << "  → 更新业务错误指标" << std::endl;
    };
    
    // 创建带自定义逻辑的错误回调
    auto errorCallback = kafka::createCustomKafkaErrorCallback(customHandler);
    producer.setErrorCallback(errorCallback);
    
    // 模拟错误
    producer.simulateError("SASL authentication failed");
    producer.simulateError("Network connection timeout");
    producer.simulateError("Topic 'orders' not found");
}

/**
 * @brief 示例3: 错误频率监控
 */
void example3_error_frequency_monitoring() {
    std::cout << "\n=== 示例3: 错误频率监控 ===" << std::endl;
    
    MockKafkaProducer producer;
    auto errorCallback = kafka::createKafkaErrorCallback();
    producer.setErrorCallback(errorCallback);
    
    // 重置统计
    kafka::resetKafkaErrorStatistics();
    
    // 模拟高频错误
    std::cout << "模拟高频错误场景..." << std::endl;
    for (int i = 0; i < 15; ++i) {
        producer.simulateError("Broker connection failed (attempt " + std::to_string(i + 1) + ")");
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    
    // 查看统计
    std::cout << "\n高频错误后的统计:\n" << kafka::getKafkaErrorStatistics() << std::endl;
}

/**
 * @brief 示例4: 不同严重级别的错误
 */
void example4_different_severity_levels() {
    std::cout << "\n=== 示例4: 不同严重级别的错误 ===" << std::endl;
    
    MockKafkaProducer producer;
    auto errorCallback = kafka::createKafkaErrorCallback();
    producer.setErrorCallback(errorCallback);
    
    // 重置统计
    kafka::resetKafkaErrorStatistics();
    
    std::cout << "模拟不同严重级别的错误..." << std::endl;
    
    // 严重错误
    producer.simulateError("CRITICAL: Authentication system failure");
    producer.simulateError("FATAL: Configuration error - invalid broker list");
    
    // 高级别错误
    producer.simulateError("Broker cluster is unavailable");
    producer.simulateError("Network partition detected");
    
    // 中等级别错误
    producer.simulateError("Topic partition leader not available");
    producer.simulateError("Message size exceeds limit");
    
    // 低级别错误
    producer.simulateError("Request timeout - will retry");
    producer.simulateError("Temporary network delay");
    
    std::cout << "\n不同严重级别错误的统计:\n" << kafka::getKafkaErrorStatistics() << std::endl;
}

/**
 * @brief 示例5: 生产环境集成示例
 */
void example5_production_integration() {
    std::cout << "\n=== 示例5: 生产环境集成示例 ===" << std::endl;
    
    MockKafkaProducer producer;
    
    // 生产环境的错误处理逻辑
    auto productionHandler = [](const std::string& errorMessage) {
        // 1. 记录到业务日志
        LOG_ERROR("Kafka生产者错误: " + errorMessage );
        
        // 2. 更新监控指标
        // metricsCollector.incrementCounter("kafka.producer.errors");
        std::cout << "📊 更新监控指标: kafka.producer.errors +1" << std::endl;
        
        // 3. 根据错误类型采取行动
        if (errorMessage.find("authentication") != std::string::npos) {
            // 触发重新认证
            std::cout << "🔐 触发Kafka重新认证流程" << std::endl;
            // authManager.reauthenticateKafka();
        }
        
        if (errorMessage.find("critical") != std::string::npos || 
            errorMessage.find("fatal") != std::string::npos) {
            // 发送紧急告警
            std::cout << "🚨 发送紧急告警到运维团队" << std::endl;
            // alertSystem.sendUrgentAlert("Kafka Critical Error", errorMessage);
        }
        
        // 4. 业务降级处理
        if (errorMessage.find("timeout") != std::string::npos) {
            std::cout << "⏰ 启用消息缓存机制" << std::endl;
            // messageCache.enableCaching();
        }
    };
    
    auto errorCallback = kafka::createCustomKafkaErrorCallback(productionHandler);
    producer.setErrorCallback(errorCallback);
    
    // 模拟生产环境错误
    producer.simulateError("Authentication failed: Token expired");
    producer.simulateError("CRITICAL: All brokers unreachable");
    producer.simulateError("Request timeout after 30000ms");
    
    std::cout << "\n生产环境错误处理完成" << std::endl;
}

/**
 * @brief 主函数 - 运行所有示例
 */
int main() {
    std::cout << "Kafka错误回调函数使用示例" << std::endl;
    std::cout << "================================" << std::endl;
    
    try {
        // 运行所有示例
        example1_basic_usage();
        example2_custom_handling();
        example3_error_frequency_monitoring();
        example4_different_severity_levels();
        example5_production_integration();
        
        std::cout << "\n=== 最终统计报告 ===" << std::endl;
        std::cout << kafka::getKafkaErrorStatistics() << std::endl;
        
        std::cout << "\n✅ 所有示例运行完成！" << std::endl;
        
    } catch (const std::exception& e) {
        std::cerr << "示例运行异常: " << e.what() << std::endl;
        return 1;
    }
    
    return 0;
}

/**
 * @brief 编译和运行说明
 * 
 * 编译命令:
 * g++ -std=c++17 -I../include \
 *     kafka_error_callback_example.cpp \
 *     ../src/common/kafka/kafka_error_handler.cpp \
 *     ../src/common/logger/logger.cpp \
 *     -o kafka_error_example \
 *     -pthread
 * 
 * 运行:
 * ./kafka_error_example
 * 
 * 预期输出:
 * - 各种错误类型的分类和处理
 * - 错误严重级别的判断
 * - 自定义处理逻辑的执行
 * - 错误统计信息的展示
 * - 生产环境集成的演示
 */
